package main

import (
	"context"
	"github.com/Shopify/sarama"
	"github.com/go-kit/kit/log"
	"github.com/go-kit/kit/log/level"
	"github.com/olivere/elastic/v7"
	"gopkg.in/yaml.v3"
	"io/ioutil"
	"os"
	"strings"
	"time"
)

const (
	RetryCount = 3
	WaitTime   = time.Second * 2
)

var topics []string
var messageChan = make(chan *OffsetMessage, 100)
var esMessageChan = make(chan *ESMessage, 100)
var ackChan = make(chan *MessageInfo, 100)
var cfg = struct {
	KafkaDSN          string   `yaml:"kafkaDSN"`
	ConsumerGroupId   string   `yaml:"consumerGroupId"`
	ElasticDSN        string   `yaml:"elasticDSN"`
	MessageTopic      string   `yaml:"messageTopic"`
	LogTopic          string   `yaml:"logTopic"`
	AlarmMessageIndex string   `yaml:"alarmMessageIndex"`
	PipelineWorkers   []string `yaml:"pipelineWorkers"`
}{}

func init() {
	b, err := ioutil.ReadFile("config.yaml")
	if err != nil {
		panic(err)
	}
	err = yaml.Unmarshal(b, &cfg)
	if err != nil {
		panic(err)
	}
	topics = []string{cfg.MessageTopic}
}

func kafkaConfig() *sarama.Config {
	config := sarama.NewConfig()
	config.Version = sarama.V2_4_0_0
	config.Consumer.Offsets.Initial = sarama.OffsetOldest
	config.Consumer.Return.Errors = true
	return config
}

func NewLogger() log.Logger {
	var logger log.Logger
	logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
	logger = log.With(logger, "svc", "controller")
	logger = log.With(logger, "ts", log.DefaultTimestampUTC)
	logger = log.With(logger, "caller", log.DefaultCaller)
	return logger
}

func GetWorkerAPIList() []string {
	return cfg.PipelineWorkers
}

func main() {
	logger := NewLogger()

	consumerGroup, err := sarama.NewConsumerGroup([]string{cfg.KafkaDSN}, cfg.ConsumerGroupId, kafkaConfig())
	if err != nil {
		_ = level.Error(logger).Log("exit", err)
		os.Exit(1)
	}
	cg := NewConsumerGroupHandler(logger, messageChan, ackChan)

	workerMessageRepository := NewWorkerMessageRepository(logger, messageChan, esMessageChan, ackChan)
	for _, API := range GetWorkerAPIList() {
		go workerMessageRepository.ParseMessage(API)
		_ = logger.Log("WorkerStart", strings.Split(API, "/api/v1/task")[0])
	}

	elasticClient, err := elastic.NewSimpleClient(elastic.SetURL(cfg.ElasticDSN))
	if err != nil {
		_ = level.Error(logger).Log("exit", err)
		os.Exit(1)
	}

	esr := NewESMessageRepository(logger, elasticClient, esMessageChan, ackChan, cfg.AlarmMessageIndex)
	go esr.SaveMessage()

	for {
		err := consumerGroup.Consume(context.Background(), topics, cg)
		if err != nil {
			_ = level.Error(logger).Log("exit", err)
			os.Exit(1)
		}
	}
}
